import asyncio
import aio_pika
from config import RABBITMQ_URI


async def direct_consumer(queue_name: str, consumer_label: str):
    """
    Direct Exchange 消费者：监听指定队列，处理对应路由键的消息
    :param queue_name: 要监听的队列名称（与生产者中定义的队列一致）
    :param consumer_label: 消费者标签（用于区分不同类型的消息处理器）
    """
    # 1. 建立与 RabbitMQ 的连接（robust 模式自动重连，提升稳定性）
    connection = await aio_pika.connect_robust(RABBITMQ_URI)
    # 2. 创建通信信道（所有操作通过信道执行，减少 TCP 连接开销）
    channel = await connection.channel()

    # 3. 开启公平调度：确保消费者处理完 1 条消息后，再接收下 1 条
    # 避免「快消费者空闲、慢消费者堆积」的不均衡问题
    await channel.set_qos(prefetch_count=1)

    # 4. 声明要监听的队列（与生产者中 queue_bindings 定义的队列完全一致）
    # 若队列不存在（未执行 setup_direct_exchange），会报错提醒初始化
    queue = await channel.declare_queue(
        queue_name,
        durable=True,  # 与生产者一致：队列持久化（重启不丢失）
        auto_delete=False  # 队列不自动删除（即使无消费者也保留）
    )

    # 5. 定义消息处理逻辑（核心：根据队列类型处理对应级别消息）
    async def on_message_received(message: aio_pika.IncomingMessage):
        # async with 上下文：自动完成消息确认（处理完后告知 RabbitMQ 删除消息）
        # 若处理过程中崩溃，消息会重新回到队列，避免丢失
        async with message.process():
            # 解码消息体（生产者用 utf-8 编码，此处对应解码）
            message_content = message.body.decode("utf-8")
            # Print key information (for debugging and log tracking)
            print(f"[{consumer_label} Consumer] Received message:")
            print(f"  Queue name: {queue_name}")
            print(f"  Message content: {message_content}")
            print(f"  Message routing key: {message.routing_key}")  # Verify routing key match
            print(f"  Processing time: {asyncio.get_running_loop().time():.2f}s\n")

            # 模拟不同级别消息的处理耗时（业务场景可替换为实际逻辑）
            if "error" in queue_name:
                # 错误消息：可能需要重试、告警，耗时更长
                await asyncio.sleep(2)
            elif "warning" in queue_name:
                # 警告消息：可能需要记录日志、轻量处理
                await asyncio.sleep(1)
            elif "info" in queue_name:
                # 信息/调试消息：快速处理，仅记录
                await asyncio.sleep(0.5)

    # 6. 启动队列监听：将消息处理函数绑定到队列
    consumer_tag = f"direct_{consumer_label.lower().replace(' ', '_')}_{queue_name}"
    await queue.consume(on_message_received, consumer_tag=consumer_tag)
    # Print startup log, indicating consumer is ready
    print(f"[{consumer_label} Consumer] Started, listening to queue: {queue_name} (tag: {consumer_tag})\n")

    # 7. 保持消费者运行（无限期阻塞，直到手动停止程序）
    # 若不阻塞，协程会立即结束，消费者会断开连接
    await asyncio.Future()


async def start_all_direct_consumers(queue_prefix="demo.direct.queue-"):
    """
    启动所有 Direct Exchange 对应的消费者
    与生产者 setup_direct_exchange 中的 queue_bindings 完全对应
    """
    # Define list of consumers to start (queue name + consumer label)
    consumers = [
        # Error queue: handles messages with routing key "error"
        direct_consumer(f"{queue_prefix}error", "Error Level"),
        # Warning queue: handles messages with routing key "warning"
        direct_consumer(f"{queue_prefix}warning", "Warning Level"),
        # Info queue: handles messages with routing keys "info" and "debug"
        direct_consumer(f"{queue_prefix}info", "Info/Debug Level")
    ]

    # 同时启动所有消费者（并发运行，互不阻塞）
    await asyncio.gather(*consumers)


if __name__ == "__main__":
    # 启动所有消费者（需先执行 setup_direct_exchange 初始化队列）
    asyncio.run(start_all_direct_consumers())
